library(tidyverse)
library(fst)
library(lubridate)
library(data.table)
library(tictoc)

Remove_Deceased <- function(dataset) {
  
  DT <- data.table(dataset)
  DT_keys <- DT[, c("cid", "qtr")]
  
  DT[, dead_marker := fifelse(VARNAME == "N", NA_integer_, 1)]
  DT <- DT[order(cid, qtr)]
  DT[, ("dead_marker") := nafill(.SD, type = "locf"), .SDcols = "dead_marker", by = cid]
  DT <- DT[is.na(dead_marker)]
  
  DT
  
}

Assign_Consumer_Group <- function(dataset) {
  
  DT <- data.table(dataset)
  
  DT[, consumer_category := fcase(
    VARNAME + VARNAME + VARNAME + VARNAME + VARNAME == 0, "CC",
    (VARNAME + VARNAME > 0) & (VARNAME + VARNAME + VARNAME == 0), "CD",
    VARNAME + VARNAME + VARNAME > 0, "CF", 
    default = as.character(NA)
  )]
  
}



Create_Outcome_Variable <- function(dataset) {
  
  DT <- data.table(dataset)
  DT_keys <- DT[, c("cid", "qtr")]
  
  DT[,p_total := VARNAME + VARNAME + VARNAME]
  
  DT_keys_no_missing_qtr <- DT[,.(qtr=seq(min(qtr),max(qtr),"3 months")),cid]
  
  setkey(DT, cid, qtr)
  setkey(DT_keys_no_missing_qtr, cid, qtr)
  
  
  
  DT <- DT[DT_keys_no_missing_qtr, roll=0]
  DT <- DT[order(cid, qtr)]
  
  DT[, ("p_total") := frollsum(.SD, n = 8, na.rm = FALSE, align = "left"), .SDcols="p_total", by = cid]
  DT[, ("p_total") := shift(.SD, 1, type = "lead"), .SDcols="p_total", by = cid]
  DT[, t_default := fifelse(p_total > 0, 1, 0)]
  
  DT <- DT[DT_keys, nomatch=0]
  DT
}


Create_Outcome_and_Open_Account_Variable <- function(dataset) {
  
  DT <- data.table(dataset)
  DT_keys <- DT[, c("cid", "qtr")]
  
  DT[,p_total := VARNAME + VARNAME + VARNAME]
  
  DT_keys_no_missing_qtr <- DT[,.(qtr=seq(min(qtr),max(qtr),"3 months")),cid]
  
  setkey(DT, cid, qtr)
  setkey(DT_keys_no_missing_qtr, cid, qtr)
  
  
  
  DT <- DT[DT_keys_no_missing_qtr, roll=0]
  DT <- DT[order(cid, qtr)]
  
  DT[, ("p_total") := frollsum(.SD, n = 8, na.rm = FALSE, align = "left"), .SDcols="p_total", by = cid]
  DT[, ("p_total") := shift(.SD, 1, type = "lead"), .SDcols="p_total", by = cid]
  DT[, t_default := fifelse(p_total > 0, 1, 0)]
  
  DT[,has_accounts := as.numeric(VARNAME > 0)]
  
  DT <- DT[DT_keys_no_missing_qtr, roll=0]
  DT <- DT[order(cid, qtr)]
  
  DT[, ("has_accounts_2qtr_sum") := frollsum(.SD, n = 2, na.rm = FALSE, align = "left"), .SDcols="has_accounts", by = cid]
  
  DT[,has_accounts_2qtr_sum := as.numeric(has_accounts_2qtr_sum > 0)]
  
  DT[, ("open_acc_has_succesive_2qtr_within_8qtr") := frollsum(.SD, n = 7, na.rm = FALSE, align = "left"), .SDcols="has_accounts_2qtr_sum", by = cid]
  DT[, ("open_acc_has_succesive_2qtr_within_8qtr") := shift(.SD, 1, type = "lead"), .SDcols="open_acc_has_succesive_2qtr_within_8qtr", by = cid]
  DT[,open_acc_has_succesive_2qtr_within_8qtr := as.numeric(open_acc_has_succesive_2qtr_within_8qtr > 0)]
  
  
  DT <- DT[DT_keys, nomatch=0]
  DT
  
}

path_cluster <- "/home/projects/Credit_Scoring_ML_MersaultMoultonSantucci/"

path_ccp <- paste0(path_cluster, "data_qtr_rand_no_cid_before_consumer_group_var/")

v_file_names <- list.files(path_ccp) 


v_vars <- c("cid", "qtr",
            "VARNAME",
            "VARNAME", "VARNAME", "VARNAME", "VARNAME", "VARNAME",
            "VARNAME",
            "VARNAME", "VARNAME", "VARNAME", "VARNAME", "VARNAME", "VARNAME",
            "VARNAME",
            "VARNAME")


df_file_names <- tibble(
  file_name = v_file_names
) %>%
  mutate(
    qtr = ymd(str_extract(file_name, "[0-9]{8}")),
    rand_no_cid = as.numeric(str_extract(file_name, "[0-9]{4}(?=.fst)"))
  ) %>% 
  glimpse()

v_rand_no_cid <- 0:9
#v_rand_no_cid <- 85:99

rand_no_cid_ <- v_rand_no_cid[1]

for(rand_no_cid_ in v_rand_no_cid){
  
  tic()
  v_file_names_rand_no_cid <- df_file_names %>% 
    filter(rand_no_cid == rand_no_cid_) %>%
    pull(file_name)
  
  
  
  v_path_file_names <- paste0(path_ccp, v_file_names_rand_no_cid)
  
  # ~100 seconds
  tic()
  df_raw <- v_path_file_names %>% 
    map_df(~read_fst(., columns = v_vars))
  toc()
  

  
  tic()
  df <- df_raw %>% 
    #filter(cid %in% v_cid_sample) %>% 
    Remove_Deceased() %>%  
    Create_Outcome_and_Open_Account_Variable() %>% 
    Assign_Consumer_Group() %>% 
    filter(!is.na(t_default)) %>%
    mutate(
      open_acc_has_succesive_2qtr_within_8qtr = coalesce(open_acc_has_succesive_2qtr_within_8qtr, 0)
    ) %>%
    as_tibble()
  toc()
  
 
  
  # path_out <- paste0(path, "intermediate/data_qtr_rand_no_cid_with_outcome/")
  path_out_cluster <- paste0(path_cluster, "data/open_accounts/")
  
  # if(!dir.exists(path_out)){ dir.create(path_out, recursive = TRUE)}
  if(!dir.exists(path_out_cluster)){ dir.create(path_out_cluster, recursive = TRUE)}
  
  v_qtr <- df %>% 
    arrange(qtr) %>% 
    select(qtr) %>% 
    distinct() %>% 
    pull(qtr)
  
  #print(v_qtr[1])
  
  for (i_qtr in 1:length(v_qtr)) {
    #i_qtr <- 1
    qtr_ <- v_qtr[i_qtr]
    
    #qtr_ <- "2009-06-01"
    
    qtr_string <- str_replace_all(qtr_, "-", "") 
    rand_no_cid_string <- str_pad(rand_no_cid_, 4, "left", pad = "0")
    
    df_qtr <- df %>% 
      filter(qtr == qtr_)
    
    
    file_name_out <- paste0(path_out_cluster, "ccp_", qtr_string, "_", rand_no_cid_string, ".fst")
    write_fst(df_qtr, file_name_out)
    
  }
  print(rand_no_cid_)
  toc()
}
